解决Flink在测试环境无法保存checkpoint问题

您所在的位置:网站首页 checkpoint 存档 author 解决Flink在测试环境无法保存checkpoint问题

解决Flink在测试环境无法保存checkpoint问题

#解决Flink在测试环境无法保存checkpoint问题| 来源: 网络整理| 查看: 265

问题

在测试环境部署的flink,无法成功的存储checkpoints。或者使用flink命令执行savepoint也无法成功保存。hdsf中创建了对应的目录,却没有写任何文件。

通过flink控制台可以看到,job的checkpoint状态处于IN_PROGRESS状态。

执行flink savepoint也可以看到输出(log4j-cli.properties中开启DEBUG级别),不断获得状态是IN_PROGRESS直到超时。

2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil   [] - -Dio.netty.allocator.type: pooled

2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil   [] - -Dio.netty.threadLocalDirectBufferSize: 0

2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil   [] - -Dio.netty.maxThreadLocalCharBufferSize: 16384

2021-11-19 08:34:29,329 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

Waiting for response...

2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.maxCapacityPerThread: 4096

2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2

2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.linkCapacity: 16

2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.ratio: 8

2021-11-19 08:34:29,916 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"request-id":"2acc7bbbc0ef3a19a595ffeb85c1706a"}.

2021-11-19 08:34:29,981 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a

2021-11-19 08:34:30,011 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.

2021-11-19 08:34:30,042 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a

2021-11-19 08:34:30,059 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.

2021-11-19 08:34:30,081 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a

2021-11-19 08:34:30,094 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.

2021-11-19 08:34:30,135 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a

2021-11-19 08:34:30,149 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.

2021-11-19 08:34:30,230 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a

通过flink控制台查看Job Manager的日志(log4j.properties中开启DEBUG级别,并且增加了%t关于线程名称的输出)可以看到"Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267."触发了checkpoint后,再无其他相关日志或异常

2021-11-19 08:34:29,723 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.maxCapacityPerThread: 4096

2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2

2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.linkCapacity: 16

2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler        [] - -Dio.netty.recycler.ratio: 8

2021-11-19 08:34:29,850 INFO  flink-akka.actor.default-dispatcher-19 org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering savepoint for job 9e97360ca975514b4a91369b05431267.

2021-11-19 08:34:29,880 INFO  Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267.

2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request.

2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request.

2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-17 org.apache.flink.runtime.jobmaster.JobMaster                 [] - Received heartbeat request from 86c79c1b6c206f760550c3773b560a98.

2021-11-19 08:34:38,959 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from f671b9c4e094cdf0975ea0ae43b50319.

2021-11-19 08:34:38,969 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from container_1637051640864_0050_01_000002.

同样的Job在10.0.11.66部署的单机flink可以checkpoint,在开发本机上可以checkpoint,在10.0.11.21-24部署的yarn集群上不能checkpoint。所以怀疑yarn或其他环境问题导致。

解决过程

查看Flink源码,找到日志中最后一条有用输出“Triggering checkpoint”的源码位置。org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint

org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint

private PendingCheckpoint createPendingCheckpoint(         long timestamp,         CheckpointProperties props,         CheckpointPlan checkpointPlan,         boolean isPeriodic,         long checkpointID,         CheckpointStorageLocation checkpointStorageLocation,         CompletableFuture onCompletionPromise) {     synchronized (lock) {         try {             // since we haven't created the PendingCheckpoint yet, we need to check the             // global state here.             preCheckGlobalState(isPeriodic);         } catch (Throwable t) {             throw new CompletionException(t);         }     }     final PendingCheckpoint checkpoint =             new PendingCheckpoint(                     job,                     checkpointID,                     timestamp,                     checkpointPlan,                     OperatorInfo.getIds(coordinatorsToCheckpoint),                     masterHooks.keySet(),                     props,                     checkpointStorageLocation,                     onCompletionPromise);     trackPendingCheckpointStats(checkpoint);     synchronized (lock) {         pendingCheckpoints.put(checkpointID, checkpoint);         ScheduledFuture cancellerHandle =                 timer.schedule(                         new CheckpointCanceller(checkpoint),                         checkpointTimeout,                         TimeUnit.MILLISECONDS);         if (!checkpoint.setCancellerHandle(cancellerHandle)) {             // checkpoint is already disposed!             cancellerHandle.cancel(false);         }     }     LOG.info(             "Triggering checkpoint {} (type={}) @ {} for job {}.",             checkpointID,             checkpoint.getProps().getCheckpointType(),             timestamp,             job);     return checkpoint; }

“Triggering checkpoint”的成功输出,说明这里没有问题,查询调用者代码org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint

org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint

private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {     try {         synchronized (lock) {             preCheckGlobalState(request.isPeriodic);         }         // we will actually trigger this checkpoint!         Preconditions.checkState(!isTriggering);         isTriggering = true;         final long timestamp = System.currentTimeMillis();         CompletableFuture checkpointPlanFuture =                 checkpointPlanCalculator.calculateCheckpointPlan();         final CompletableFuture pendingCheckpointCompletableFuture =                 checkpointPlanFuture                         .thenApplyAsync(                                 plan -> {                                     try {                                         CheckpointIdAndStorageLocation                                                 checkpointIdAndStorageLocation =                                                         initializeCheckpoint(                                                                 request.props,                                                                 request.externalSavepointLocation);                                         return new Tuple2(                                                 plan, checkpointIdAndStorageLocation);                                     } catch (Throwable e) {                                         throw new CompletionException(e);                                     }                                 },                                 executor)                         .thenApplyAsync(                                 (checkpointInfo) ->                                         createPendingCheckpoint(                                                 timestamp,                                                 request.props,                                                 checkpointInfo.f0,                                                 request.isPeriodic,                                                 checkpointInfo.f1.checkpointId,                                                 checkpointInfo.f1.checkpointStorageLocation,                                                 request.getOnCompletionFuture()),                                 timer);         final CompletableFuture coordinatorCheckpointsComplete =                 pendingCheckpointCompletableFuture.thenComposeAsync(                         (pendingCheckpoint) ->                                 OperatorCoordinatorCheckpoints                                         .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(                                                 coordinatorsToCheckpoint,                                                 pendingCheckpoint,                                                 timer),                         timer);         // We have to take the snapshot of the master hooks after the coordinator checkpoints         // has completed.         // This is to ensure the tasks are checkpointed after the OperatorCoordinators in case         // ExternallyInducedSource is used.         final CompletableFuture masterStatesComplete =                 coordinatorCheckpointsComplete.thenComposeAsync(                         ignored -> {                             // If the code reaches here, the pending checkpoint is guaranteed to                             // be not null.                             // We use FutureUtils.getWithoutException() to make compiler happy                             // with checked                             // exceptions in the signature.                             PendingCheckpoint checkpoint =                                     FutureUtils.getWithoutException(                                             pendingCheckpointCompletableFuture);                             return snapshotMasterState(checkpoint);                         },                         timer);         FutureUtils.assertNoException(                 CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)                         .handleAsync(                                 (ignored, throwable) -> {                                     final PendingCheckpoint checkpoint =                                             FutureUtils.getWithoutException(                                                     pendingCheckpointCompletableFuture);                                     Preconditions.checkState(                                             checkpoint != null || throwable != null,                                             "Either the pending checkpoint needs to be created or an error must have been occurred.");                                     if (throwable != null) {                                         // the initialization might not be finished yet                                         if (checkpoint == null) {                                             onTriggerFailure(request, throwable);                                         } else {                                             onTriggerFailure(checkpoint, throwable);                                         }                                     } else {                                         if (checkpoint.isDisposed()) {                                             onTriggerFailure(                                                     checkpoint,                                                     new CheckpointException(                                                             CheckpointFailureReason                                                                     .TRIGGER_CHECKPOINT_FAILURE,                                                             checkpoint.getFailureCause()));                                         } else {                                             // no exception, no discarding, everything is OK                                             final long checkpointId =                                                     checkpoint.getCheckpointId();                                             snapshotTaskState(                                                     timestamp,                                                     checkpointId,                                                     checkpoint.getCheckpointStorageLocation(),                                                     request.props,                                                     checkpoint                                                             .getCheckpointPlan()                                                             .getTasksToTrigger());                                             coordinatorsToCheckpoint.forEach(                                                     (ctx) ->                                                             ctx.afterSourceBarrierInjection(                                                                     checkpointId));                                             // It is possible that the tasks has finished                                             // checkpointing at this point.                                             // So we need to complete this pending checkpoint.                                             if (!maybeCompleteCheckpoint(checkpoint)) {                                                 return null;                                             }                                             onTriggerSuccess();                                         }                                     }                                     return null;                                 },                                 timer)                         .exceptionally(                                 error -> {                                     if (!isShutdown()) {                                         throw new CompletionException(error);                                     } else if (findThrowable(                                                     error, RejectedExecutionException.class)                                             .isPresent()) {                                         LOG.debug("Execution rejected during shutdown");                                     } else {                                         LOG.warn("Error encountered during shutdown", error);                                     }                                     return null;                                 }));     } catch (Throwable throwable) {         onTriggerFailure(request, throwable);     } }

这里是重点,然后疯狂的给源码加LOG输出,几乎每行后面,每一个子方法里面都疯狂的加LOG输出。然后编译flink部署测试环境10.0.11.24(只留下这一台实验),编译一次20分钟,折腾了N回。

pendingCheckpointCompletableFuture                            .thenComposeAsync(                            (pendingCheckpoint) -> {                                LOG.warn(                                        "pendingCheckpointCompletableFuture.thenComposeAsync >>>>>>>> pendingCheckpoint:{},Thread.currentThread():{}",                                        pendingCheckpoint,                                        Thread.currentThread().getId());                                return OperatorCoordinatorCheckpoints                                        .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(                                                coordinatorsToCheckpoint,                                                pendingCheckpoint,                                                timer);                            },                            timer);

最终发现org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion中走了之后,其中返回的CompletableFuture没有被thenComposeAsync执行。

org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion

public static CompletableFuture triggerAndAcknowledgeAllCoordinatorCheckpoints(         final Collection coordinators,         final PendingCheckpoint checkpoint,         final Executor acknowledgeExecutor)         throws Exception {     LOG.warn(             "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> checkpoint:{}",             checkpoint);     try {         final CompletableFuture snapshots =                 triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());         LOG.warn(                 "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> snapshots:{},acknowledgeExecutor:{}",                 snapshots,                 acknowledgeExecutor);         // TODO: 2021/11/18 下面没有走 ,为了看有没有错误导致不执行,源码的thenAcceptAsync替换成handleAsync         return snapshots.handleAsync(                 (allSnapshots, err) -> {                     if (err != null) {                         LOG.error(                                 "snapshots.thenAcceptAsync >>>>> err",                                 err);                     } else {                         try {                             LOG.warn(                                     "snapshots.thenAcceptAsync >>>>> checkpoint:{}, allSnapshots.snapshots:{}",                                     checkpoint,                                     allSnapshots.snapshots);                             acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);                         } catch (Exception e) {                             LOG.error("snapshots.thenAcceptAsync >>>>> Exception", e);                             throw new CompletionException(e);                         }                     }                     return null;                 }, acknowledgeExecutor);     } catch (Exception ex) {         LOG.error(                 "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> Exception",                 ex);         throw ex;     } } public static CompletableFuture triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(         final Collection coordinators,         final PendingCheckpoint checkpoint,         final Executor acknowledgeExecutor)         throws CompletionException {     try {         LOG.warn(                 "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> checkpoint:{}",                 checkpoint);         return triggerAndAcknowledgeAllCoordinatorCheckpoints(                 coordinators, checkpoint, acknowledgeExecutor);     } catch (Exception e) {         LOG.error(                 "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> Exception",                 e);         throw new CompletionException(e);     } }

返回snapshots.thenAcceptAsync产生的CompletableFuture没有执行,不知道卡住在哪里了。

return snapshots.thenAcceptAsync(                 (allSnapshots) -> {                     try {                         acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);                     } catch (Exception e) {                         throw new CompletionException(e);                     }                 },                 acknowledgeExecutor);

注意到日志输出时,输出的日志中线程名称都是“Checkpoint Timer”

2021-11-19 08:34:29,880 INFO  Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267.

在测试环境上用arthas查查这个“Checkpoint Timer”干啥了

#切换用户为yarn,因为flink进程时yarn启动的,否则不允许调试

su yarn

#注意yarn不允许登录,需要修改/etc/passwd文件yarn:x:985:984:Hadoop Yarn:/var/lib/hadoop-yarn:/bin/bash

yarn@node24:/tmp$ java -jar arthas-boot.jar

[INFO] arthas-boot version: 3.5.3

[INFO] Found existing java process, please choose one and input the serial number of the process, eg : 1. Then hit ENTER.

* [1]: 4176317 org.apache.flink.yarn.YarnTaskExecutorRunner

  [2]: 2775993 org.apache.hadoop.yarn.server.nodemanager.NodeManager

  [3]: 4176146 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

#选择org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint进程

3

[INFO] arthas home: /var/lib/hadoop-yarn/.arthas/lib/3.5.4/arthas

[INFO] Try to attach process 4176146

[INFO] Attach process 4176146 success.

[INFO] arthas-client connect 127.0.0.1 3658

  ,---.  ,------. ,--------.,--.  ,--.  ,---.   ,---.

 /  O  \ |  .--. ''--.  .--'|  '--'  | /  O  \ '   .-'

|  .-.  ||  '--'.'   |  |   |  .--.  ||  .-.  |`.  `-.

|  | |  ||  |\  \    |  |   |  |  |  ||  | |  |.-'    |

`--' `--'`--' '--'   `--'   `--'  `--'`--' `--'`-----'

wiki       Arthas 用户文档 — Arthas 3.5.4 文档

tutorials  淘宝网 - 淘!我喜欢-tutorials.html

version    3.5.4

main_class

pid        4176146

time       2021-11-19 09:16:06

#查询叫Checkpoint Timer的线程

[arthas@4176146]$ thread -all | grep Checkpoint Timer

86      Checkpoint Timer                             main                   5               WAITING        0.0            0.000          0:0.020         false          true

#查看这个线程

[arthas@4176146]$ thread 86

"Checkpoint Timer" Id=86 WAITING on java.util.concurrent.CompletableFuture$WaitNode@7dbca052

    at sun.misc.Unsafe.park(Native Method)

    -  waiting on java.util.concurrent.CompletableFuture$WaitNode@7dbca052

    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

    at java.util.concurrent.CompletableFuture$WaitNode.block(CompletableFuture.java:271)

    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3226)

    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:319)

    at java.util.concurrent.CompletableFuture.access$000(CompletableFuture.java:111)

    at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:616)

    at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)

[arthas@4176146]$

果然这个线程在奇怪的傻等着“WAITING on java.util.concurrent.CompletableFuture”,为什么会这么等着呢。继续观察源码。

里面:

return snapshots.thenAcceptAsync(                 (allSnapshots) -> {                     try {                         acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);                     } catch (Exception e) {                         throw new CompletionException(e);                     }                 },                 acknowledgeExecutor);

外面:

final CompletableFuture coordinatorCheckpointsComplete =         pendingCheckpointCompletableFuture.thenComposeAsync(                 (pendingCheckpoint) ->                         OperatorCoordinatorCheckpoints                                 .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(                                         coordinatorsToCheckpoint,                                         pendingCheckpoint,                                         timer),                 timer);

观察thenComposeAsync内外都使用了同一个Executor参数执行,追查一下这个timer怎么来的,查到org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing方法创建CheckpointCoordinator实例的时候。

org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing

checkpointCoordinatorTimer =                 Executors.newSingleThreadScheduledExecutor(                         new DispatcherThreadFactory(                                 Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));         // create the coordinator that triggers and commits checkpoints and holds the state         checkpointCoordinator =                 new CheckpointCoordinator(                         jobInformation.getJobId(),                         chkConfig,                         operatorCoordinators,                         checkpointIDCounter,                         checkpointStore,                         checkpointStorage,                         ioExecutor,                         checkpointsCleaner,                         new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),                         SharedStateRegistry.DEFAULT_FACTORY,                         failureManager,                         createCheckpointPlanCalculator(),                         new ExecutionAttemptMappingProvider(getAllExecutionVertices()));

发现这个叫timer的Executor是Executors.newSingleThreadScheduledExecutor创建的单线程执行器。那问题会不会出在单线程上,这个执行器只要一个线程,外面的异步执行已经用了,而里面的异步也需要这个执行器提供一个线程,然后没线程用了。

我自己写了一套例子,然而本地执行证明这个猜想并不成立呀,没有卡住。不死心网上搜索相关问题,果然有人同样这么问,并且是有卡住的可能

How to use CompletableFuture.thenComposeAsync()?

我在将他写的例子在本地执行,但是仍然不会卡住。突然想起来问题是测试环境才有的

public class CompletableFutureTest {     public static void main(String[] args) {         ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor();         CompletableFuture.runAsync(() ->         {             System.out.println("Task 1. Thread: " + Thread.currentThread().getId());         }, singleThreadExecutor).thenComposeAsync((Void unused) ->         {             return CompletableFuture.runAsync(() ->             {                 System.out.println("Task 2. Thread: " + Thread.currentThread().getId());             }, singleThreadExecutor);         }, singleThreadExecutor).join();         System.out.println("finished"); // 我写的例子更模仿flink的checkpoint代码,其实问题只在thenComposeAsync,都一样 //        CompletableFuture cf = CompletableFuture //                .supplyAsync(() -> "aaaaa", scheduledExecutorService) //                .thenComposeAsync((str) -> { //                    return CompletableFuture.runAsync(() -> { //                        try { //                            Thread.sleep(2000); //                        } catch (InterruptedException e) { //                            e.printStackTrace(); //                        } //                        System.out.println(str.concat(" bbbbbbbbbbbb")); //                    }, scheduledExecutorService); //                }, scheduledExecutorService); //        CompletableFuture //                .allOf(cf.thenApplyAsync((str) -> { //                    try { //                        Thread.sleep(5000); //                    } catch (InterruptedException e) { //                        e.printStackTrace(); //                    } //                    System.out.println("ddddddd"); //                    return "ccccc"; //                }, scheduledExecutorService), cf) //                .handleAsync((str, err) -> { //                    System.out.println(str); //                    return null; //                }, scheduledExecutorService).join(); //        scheduledExecutorService.shutdown();     } }

然后拿到10.0.11.24出问题的测试环境执行,果然卡住了

然后拿到没有问题的10.0.11.66上执行

对比JDK版本 问题机器24:

没问题机器66: 

 

也就是说某些低版本的jdk存在CompletableFuture会被Executors线程数不够卡死的问题。较高版本的JDK解决了这个问题。

在问题机器24上更换JDK版本

然而不行,查看Job Manager的日志JAVA_HOME仍然指向老的jdk

2021-11-19 10:17:27,420 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting YarnJobClusterEntrypoint (Version: 1.13.1, Scala: 2.11, Rev:a7f3192, Date:2021-05-25T12:02:11+02:00)

2021-11-19 10:17:27,421 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  OS current user: yarn

2021-11-19 10:17:27,712 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Current Hadoop/Kerberos user: root

2021-11-19 10:17:27,714 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.20-b23

2021-11-19 10:17:27,715 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Maximum heap size: 429 MiBytes

2021-11-19 10:17:27,715 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle

2021-11-19 10:17:27,718 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Hadoop version: 3.0.0-cdh6.3.2

2021-11-19 10:17:27,718 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  JVM Options:

2021-11-19 10:17:27,718 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -     -Xmx469762048

2021-11-19 10:17:27,718 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -     -Xms469762048

2021-11-19 10:17:27,718 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -     -XX:MaxMetaspaceSize=268435456

2021-11-19 10:17:27,719 INFO  main org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -     -Dlog.file=/yarn/

试了好几种方式yarn环境里的JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle仍然指向老版本的JDK,为了着急看效果,直接将新版jdk文件覆盖/usr/lib/jvm/j2sdk1.8-oracle,然后重试。

savepoint成功!!!!

2021-11-19 10:48:44,377 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.

2021-11-19 10:48:44,699 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Sending request of class class org.apache.flink.runtime.resssages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/ef1a9b56a942da679f0aaaf7602b710b/savepoints/3829b863721270f8b536a8ca8910d6e3

2021-11-19 10:48:44,716 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Received response {"status":{"id":"COMPLETED"},"operation":{"location":"hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e"}}.

2021-11-19 10:48:44,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e

Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e

2021-11-19 10:48:44,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] - You can resume your program from this savepoint with the run command.

You can resume your program from this savepoint with the run command.

2021-11-19 10:48:44,721 DEBUG org.apache.flink.runtime.rest.RestClient                     [] - Shutting down rest endpoint.



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3